/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.runtime.state.heap;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemorySegmentInputStreamWithPos;

import java.io.IOException;

/**
 * Serializer/deserializer used for conversion between key/namespace and skip list key.
 * It is not thread safe.
 *
 * @param <K> The type of the key.
 * @param <N> The type of the namespace.
 */
class SkipListKeySerializer<K, N> {

	private final TypeSerializer<K> keySerializer;
	private final TypeSerializer<N> namespaceSerializer;
	private final ByteArrayOutputStreamWithPos outputStream;
	private final DataOutputViewStreamWrapper outputView;

	SkipListKeySerializer(
		TypeSerializer<K> keySerializer,
		TypeSerializer<N> namespaceSerializer) {
		this.keySerializer = keySerializer;
		this.namespaceSerializer = namespaceSerializer;
		this.outputStream = new ByteArrayOutputStreamWithPos();
		this.outputView = new DataOutputViewStreamWrapper(outputStream);
	}

	/**
	 * Serialize the key and namespace to bytes. The format is
	 * 	- int:    length of serialized namespace
	 * 	- byte[]: serialized namespace
	 * 	- int:    length of serialized key
	 * 	- byte[]: serialized key
	 */
	byte[] serialize(K key, N namespace) {
		// we know that the segment contains a byte[], because it is created
		// in the method below by wrapping a byte[]
		return serializeToSegment(key, namespace).getArray();
	}

	/**
	 * Serialize the key and namespace to bytes. The format is
	 * 	- int:    length of serialized namespace
	 * 	- byte[]: serialized namespace
	 * 	- int:    length of serialized key
	 * 	- byte[]: serialized key
	 */
	MemorySegment serializeToSegment(K key, N namespace) {
		outputStream.reset();
		try {
			// serialize namespace
			outputStream.setPosition(Integer.BYTES);
			namespaceSerializer.serialize(namespace, outputView);
		} catch (IOException e) {
			throw new RuntimeException("Failed to serialize namespace", e);
		}

		int keyStartPos = outputStream.getPosition();
		try {
			// serialize key
			outputStream.setPosition(keyStartPos + Integer.BYTES);
			keySerializer.serialize(key, outputView);
		} catch (IOException e) {
			throw new RuntimeException("Failed to serialize key", e);
		}

		final byte[] result = outputStream.toByteArray();
		final MemorySegment segment = MemorySegmentFactory.wrap(result);

		// set length of namespace and key
		segment.putInt(0, keyStartPos - Integer.BYTES);
		segment.putInt(keyStartPos, result.length - keyStartPos - Integer.BYTES);

		return segment;
	}

	/**
	 * Deserialize the namespace from the byte buffer which stores skip list key.
	 *
	 * @param memorySegment the memory segment which stores the skip list key.
	 * @param offset     the start position of the skip list key in the byte buffer.
	 * @param len        length of the skip list key.
	 */
	N deserializeNamespace(MemorySegment memorySegment, int offset, int len) {
		MemorySegmentInputStreamWithPos inputStream =
			new MemorySegmentInputStreamWithPos(memorySegment, offset, len);
		DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(inputStream);
		inputStream.setPosition(offset + Integer.BYTES);
		try {
			return namespaceSerializer.deserialize(inputView);
		} catch (IOException e) {
			throw new RuntimeException("deserialize namespace failed", e);
		}
	}

	/**
	 * Deserialize the partition key from the byte buffer which stores skip list key.
	 *
	 * @param memorySegment the memory segment which stores the skip list key.
	 * @param offset     the start position of the skip list key in the byte buffer.
	 * @param len        length of the skip list key.
	 */
	K deserializeKey(MemorySegment memorySegment, int offset, int len) {
		MemorySegmentInputStreamWithPos inputStream =
			new MemorySegmentInputStreamWithPos(memorySegment, offset, len);
		DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(inputStream);
		int namespaceLen = memorySegment.getInt(offset);
		inputStream.setPosition(offset + Integer.BYTES + namespaceLen + Integer.BYTES);
		try {
			return keySerializer.deserialize(inputView);
		} catch (IOException e) {
			throw new RuntimeException("deserialize key failed", e);
		}
	}

	/**
	 * Gets serialized key and namespace from the byte buffer.
	 *
	 * @param memorySegment the memory segment which stores the skip list key.
	 * @param offset     the start position of the skip list key in the byte buffer.
	 * @return tuple of serialized key and namespace.
	 */
	Tuple2<byte[], byte[]> getSerializedKeyAndNamespace(MemorySegment memorySegment, int offset) {
		// read namespace
		int namespaceLen = memorySegment.getInt(offset);
		MemorySegment namespaceSegment = MemorySegmentFactory.allocateUnpooledSegment(namespaceLen);
		memorySegment.copyTo(offset + Integer.BYTES, namespaceSegment, 0, namespaceLen);

		// read key
		int keyOffset = offset + Integer.BYTES + namespaceLen;
		int keyLen = memorySegment.getInt(keyOffset);
		MemorySegment keySegment = MemorySegmentFactory.allocateUnpooledSegment(keyLen);
		memorySegment.copyTo(keyOffset + Integer.BYTES, keySegment, 0, keyLen);

		return Tuple2.of(keySegment.getArray(), namespaceSegment.getArray());
	}

	/**
	 * Serialize the namespace to bytes.
	 */
	byte[] serializeNamespace(N namespace) {
		outputStream.reset();
		try {
			namespaceSerializer.serialize(namespace, outputView);
		} catch (IOException e) {
			throw new RuntimeException("serialize namespace failed", e);
		}
		return outputStream.toByteArray();
	}

	MemorySegment serializeNamespaceToSegment(N namespace) {
		return MemorySegmentFactory.wrap(serializeNamespace(namespace));
	}
}
